/**
 * FileName: StructuredStreamingHDFS3
 * Author:   SAMSUNG-PC 孙中军
 * Date:     2019/02/20 10:25
 * Description: 读取来自HDFS数据JoinHDFS，并将数据存储到HDFS中
 */
package cn.com.bonc.app;

import cn.com.bonc.conf.ConfigurationManager;
import cn.com.bonc.constant.Constants;
import cn.com.bonc.process.chain.ProcessChain;
import cn.com.bonc.process.impl.RuleActionProcessImpl;
import cn.com.bonc.process.impl.SingleColum2MutiProcessImpl;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.streaming.OutputMode;
import org.apache.spark.sql.streaming.StreamingQuery;
import org.apache.spark.sql.streaming.StreamingQueryException;

import static org.apache.spark.sql.functions.*;

public class StructuredStreamingJoinRedis {


	private static final String SOURCE_PATH="/test_szj/source_data";

	private static final String SAVE_PATH = "hdfs://192.168.70.21:9000/test_szj/parquet9";

	public static void main(String[] args) {

		SparkSession spark = SparkSession
				.builder()
				.appName("HDFSJoinRedisApp")
				.config("spark.redis.host", ConfigurationManager.getProperty(Constants.REDIS_IP))
				.config("spark.redis.port", ConfigurationManager.getProperty(Constants.REDIS_PORT))
				.getOrCreate();

		//Data from redis
		Dataset<Row> redisDataset = spark
				.read()
				.format("org.apache.spark.sql.redis")
				.option("table", "Xdata")
				.load();

		//Data from HDFS
		Dataset<Row> HDFSDataset = spark
				.readStream()
				.option("latestFirst","false")
				.option("fileNameOnly","false")
				.text(SOURCE_PATH);

		//filter data
		Dataset<Row> filterResultDataset = ProcessChain
				.setSourceData(HDFSDataset)
				.addProcess(new RuleActionProcessImpl())
				.addProcess(new SingleColum2MutiProcessImpl())
				.execute();

		//Save to HDFS
		StreamingQuery query = filterResultDataset
				.join(redisDataset, expr("phone = key"))
				.select(col("phone"),col("userAgent"),col("URL"))
				.map((MapFunction<Row,String>) x->{
							String str = x.toString();
							return str.substring(1,str.length()-1).replace(',','|');
						}
					,Encoders.STRING())
				.writeStream()
				.outputMode(OutputMode.Append())
				.format("text")
				.option("checkpointLocation", SAVE_PATH)
				.option("path", SAVE_PATH)
				.start();

		//Wait and keep the program running
		try {
			query.awaitTermination();
		} catch (StreamingQueryException e) {
			e.printStackTrace();
		}
	}

}